package com.anlu.kafka.hellokafka;

import com.anlu.kafka.config.BusiConst;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

/**
 * kafka的消费真
 */
public class HelloKafkaConsumer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        properties.put("key.deserializer",
                StringDeserializer.class);
        properties.put("value.deserializer",
                StringDeserializer.class);

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
        KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(properties);
        try{
            consumer.subscribe(Collections.singletonList(BusiConst.HELLO_TOPIC));
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(500);
                for(ConsumerRecord<String, String> record:records){
                    System.out.println(String.format("topic:%s,分区：%d,偏移量：%d," +
                                    "key:%s,value:%s",record.topic(),record.partition(),
                            record.offset(),record.key(),record.value()));
                    //do my work
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        finally {
            consumer.close();
        }



    }


}
